{
  "cells": [
    {
      "cell_type": "markdown",
      "id": "Td_alkbOv3Aj",
      "metadata": {
        "id": "Td_alkbOv3Aj"
      },
      "source": [
        "# Spark RAPIDS Parquet acceleration\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "c6ed860b",
      "metadata": {
        "id": "c6ed860b"
      },
      "source": [
        "<a target=\"_blank\" href=\"https://colab.research.google.com/github/NVIDIA/spark-rapids-examples/blob/main/examples/SQL%2BDF-Examples/demo/Spark_parquet_microkernels.ipynb\">\n",
        "  <img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/>\n",
        "</a>\n"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "AhUsdz6jLdMi",
      "metadata": {
        "id": "AhUsdz6jLdMi"
      },
      "source": [
        "\n",
        "Before getting started - be sure to change your runtime to use a GPU Hardware accelerator! Use the Runtime -> \"Change runtime type\" menu option to add a GPU."
      ]
    },
    {
      "cell_type": "markdown",
      "id": "ZfNDlz0SM0DB",
      "metadata": {
        "id": "ZfNDlz0SM0DB"
      },
      "source": [
        "# Let's get started using the RAPIDS Accelerator for Apache Spark"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "PzW61-K04A1E",
      "metadata": {
        "id": "PzW61-K04A1E"
      },
      "outputs": [],
      "source": [
        "!nvidia-smi"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "!cat /proc/cpuinfo"
      ],
      "metadata": {
        "id": "OIEun51OCyC4"
      },
      "id": "OIEun51OCyC4",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "spark_version='3.5.0'\n",
        "rapids_version='24.12.0'"
      ],
      "metadata": {
        "id": "NEGt46X7nEqf"
      },
      "id": "NEGt46X7nEqf",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "%pip install --quiet \\\n",
        "  pyspark=={spark_version}"
      ],
      "metadata": {
        "id": "g9XK28gcnHiG"
      },
      "id": "g9XK28gcnHiG",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "from importlib.resources import files\n",
        "from pyspark.sql import SparkSession\n",
        "import glob\n",
        "import os\n",
        "import re\n",
        "import time\n",
        "import statistics"
      ],
      "metadata": {
        "id": "gr2msGD1nLh-"
      },
      "id": "gr2msGD1nLh-",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "pyspark_files = files('pyspark')\n",
        "spark_sql_jar_path, *_ = glob.glob(f\"{pyspark_files}/*/spark-sql_*jar\")\n",
        "spark_sql_jar = os.path.basename(spark_sql_jar_path)\n",
        "scala_version = re.search(r'^spark-sql_(\\d+.\\d+)-.*\\.jar$', spark_sql_jar).group(1)"
      ],
      "metadata": {
        "id": "0uXK6z8KoFUt"
      },
      "id": "0uXK6z8KoFUt",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "spark = (\n",
        "    SparkSession.builder\n",
        "      .appName('Parquet Spark GPU Acceleration')\n",
        "      .master('local[*]')\n",
        "      .config('spark.driver.memory', '5g')\n",
        "      .config('spark.plugins', 'com.nvidia.spark.SQLPlugin')\n",
        "      .config('spark.jars.packages', f\"com.nvidia:rapids-4-spark_{scala_version}:{rapids_version}\")\n",
        "      .getOrCreate()\n",
        ")\n",
        "spark"
      ],
      "metadata": {
        "id": "ayT5VJQvnQv4"
      },
      "id": "ayT5VJQvnQv4",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "location = \"./TMP_DATA\"\n",
        "iters = 5"
      ],
      "metadata": {
        "id": "3VsYyTATpNG1"
      },
      "id": "3VsYyTATpNG1",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "from pyspark.sql.types import IntegerType, StringType, StructType, StructField\n",
        "from pyspark.sql import functions as F\n",
        "import random\n",
        "import string\n",
        "\n",
        "# Define schema\n",
        "schema = StructType([\n",
        "    StructField(\"id\", IntegerType(), False),\n",
        "    StructField(\"name\", StringType(), False),\n",
        "    StructField(\"age\", IntegerType(), False),\n",
        "    StructField(\"salary\", IntegerType(), False)\n",
        "])\n",
        "\n",
        "# Function to generate random strings\n",
        "def random_string(length=10):\n",
        "    return ''.join(random.choices(string.ascii_letters, k=length))\n",
        "\n",
        "# Generate DataFrame with 20M rows\n",
        "df = spark.range(0, 20_000_000).toDF(\"id\") \\\n",
        "    .withColumn(\"name\", F.udf(lambda: random_string(), StringType())()) \\\n",
        "    .withColumn(\"age\", (F.rand() * 50 + 20).cast(IntegerType())) \\\n",
        "    .withColumn(\"salary\", (F.rand() * 100000 + 30000).cast(IntegerType()))\n",
        "\n",
        "df.write.mode(\"overwrite\").parquet(location)"
      ],
      "metadata": {
        "id": "diUi3mxWh91X"
      },
      "id": "diUi3mxWh91X",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Run the Parquet scan test on the GPU\n",
        "spark.conf.set(\"spark.rapids.sql.enabled\",True)\n",
        "gpu_times = []\n",
        "for i in range(iters):\n",
        "    start = time.time()\n",
        "    df = spark.read.parquet(location).selectExpr(\"count(name) as rows\", \"avg(salary) as average_salary\", \"median(salary) as median_salary\", \"sum(salary) as total_salary\", \"avg(age) as average_age\", \"median(age) as median_age\")\n",
        "    if i == 0:\n",
        "      df.show()\n",
        "    else:\n",
        "      df.collect()\n",
        "    end = time.time()\n",
        "    gpu_times.append(end - start)\n",
        "\n",
        "gpu_median = statistics.median(gpu_times)\n",
        "\n",
        "print(f\"Median execution time of {iters} runs for GPU Parquet scan: {gpu_median:.3f}\")"
      ],
      "metadata": {
        "id": "iXaXVgBNt4pK"
      },
      "id": "iXaXVgBNt4pK",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# Run the Parquet scan test on the CPU\n",
        "spark.conf.set(\"spark.rapids.sql.enabled\",False)\n",
        "cpu_times = []\n",
        "for i in range(iters):\n",
        "    start = time.time()\n",
        "    df = spark.read.parquet(location).selectExpr(\"count(name) as rows\", \"avg(salary) as average_salary\", \"median(salary) as median_salary\", \"sum(salary) as total_salary\", \"avg(age) as average_age\", \"median(age) as median_age\")\n",
        "    if i == 0:\n",
        "      df.show()\n",
        "    else:\n",
        "      df.collect()\n",
        "    end = time.time()\n",
        "    cpu_times.append(end - start)\n",
        "\n",
        "cpu_median = statistics.median(cpu_times)\n",
        "print(f\"Median execution time of {iters} runs for CPU Parquet scan: {cpu_median:.3f}\")"
      ],
      "metadata": {
        "id": "lUmVe12Wic5X"
      },
      "id": "lUmVe12Wic5X",
      "execution_count": null,
      "outputs": []
    },
    {
      "cell_type": "code",
      "source": [
        "# GPU speedup should be in the range of 5-10x\n",
        "speedup = cpu_median / gpu_median\n",
        "print(f\"GPU speedup: {speedup:.2f}x\")"
      ],
      "metadata": {
        "id": "CxROFk_AoQQl"
      },
      "id": "CxROFk_AoQQl",
      "execution_count": null,
      "outputs": []
    }
  ],
  "metadata": {
    "accelerator": "GPU",
    "colab": {
      "provenance": []
    },
    "gpuClass": "standard",
    "kernelspec": {
      "display_name": "Python 3.9.12 ('base')",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.9.12"
    },
    "vscode": {
      "interpreter": {
        "hash": "5327a248d9883bedf47bfd9e608af95bf318797e621edcc550c6b5b3fdc820cc"
      }
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
